﻿/******************************************************************************
 * This file is part of libemb.
 *
 * libemb is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * libemb is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with libemb.  If not, see <http://www.gnu.org/licenses/>.
 *
 * Project: Embedme
 * Author : FergusZeng
 * Email  : cblock@126.com
 * git	  : https://git.oschina.net/cblock/embedme
 * Copyright 2014~2017 @ ShenZhen ,China
*******************************************************************************/
#include "MsgQueue.h"
#include "Tracer.h"
#include "Directory.h"

#include <stdlib.h>
#include <sys/mount.h>
#if USE_MQ_SYSV
#else
#include <unistd.h>
#include <time.h>
#endif

#define MAX_MESSAGES    200     /* 最大允许的消息条数 */

MsgQueue::MsgQueue(key_t key):
m_msgID(-1),
m_key(key)
{
}

MsgQueue::~MsgQueue()
{
#if USE_MQ_SYSV
#else
	mq_unlink(getMqName(m_key).c_str()); 
#endif
}

std::string MsgQueue::getMqName(int key)
{
#if USE_MQ_SYSV
    return "";
#else
    char mqName[32]={0};
    sprintf(mqName,"/mq_%x",key);
    return std::string(mqName);
#endif
}


/**
 *  \brief  初始化消息队列
 *  \param  void
 *  \return 成功返回true,失败返回false
 *  \note   none
 */
bool MsgQueue::initialize()
{
#if USE_MQ_SYSV
	m_msgID = msgget(m_key, 0666|IPC_CREAT);
#else
    if(!Directory::isExist(POSIX_MQFS))
    {
        if(!Directory::createDir(POSIX_MQFS))
        {
            return false;
        }
        //CommandPipe::execute("mount -t mqueue none "POSIX_MQFS);
        if(mount("none",POSIX_MQFS,"mqueue",0,NULL)!=0)
        {
            TRACE_ERR_CLASS("mount mqueue error!\n");
            return false;
        }
    }
	mq_unlink(getMqName(m_key).c_str());
	int oflags = O_RDWR|O_CREAT|O_EXCL;
    mode_t mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
    struct mq_attr attr;
    attr.mq_maxmsg = 10;
    attr.mq_msgsize = MSG_SIZE_MAX+4;
	m_msgID = (int)mq_open(getMqName(m_key).c_str(),oflags,mode,&attr);
#endif
    if (m_msgID<0)
	{
		TRACE_ERR_CLASS("MsgQueue(key:0x%x) initialize failed:%s.\n",m_key,strerror(errno));
		return false;
	}
	return true;
}

/**
 *  \brief  发送消息
 *  \param  msg 消息体
 *  \return 成功返回STATUS_OK,失败返回STATUS_ERROR
 *  \note   none
 */
int MsgQueue::sendMsg(QueueMsg_S& msg)
{
    int ret = 0;
    if (msg.m_msgType<=0)
    {
        TRACE_ERR_CLASS("msg type must be greater than zero!\n");
        return STATUS_ERROR;
    }
    if (msg.m_dataLen>=MSG_SIZE_MAX)
    {
        TRACE_ERR_CLASS("msg data len must be less than %d!\n",MSG_SIZE_MAX);
        return STATUS_ERROR;
    }
#if USE_MQ_SYSV
	ret = msgsnd(m_msgID, (void*)(&msg), msg.m_dataLen+sizeof(msg.m_dataLen), 0);
#else 
    struct timespec timeout;
    timeout.tv_sec=0;
    timeout.tv_nsec=10;
	ret=mq_timedsend((mqd_t)m_msgID,(const char*)(&msg),MSG_SIZE_MAX+4,0,&timeout);
#endif
    if (ret<0)
	{
		//TRACE_ERR_CLASS("MsgQueue(key:0x%x) send msg(type:%d,datalen:%d) failed.\n",m_key,msg.m_msgType,msg.m_dataLen);
		return STATUS_ERROR;
	}
    return STATUS_OK;
}

/**
 *  \brief  接收消息
 *  \param  msg 消息体
 *  \param  msgType 消息类型
 *  \return 成功返回STATUS_OK,失败返回STATUS_ERROR
 *  \note   none
 */
int MsgQueue::recvMsg(QueueMsg_S& msg, int msgType)
{
    int ret = 0;
#if USE_MQ_SYSV
	ret = msgrcv(m_msgID, (void*)(&msg), MSG_SIZE_MAX, (long)msgType, IPC_NOWAIT|MSG_NOERROR);
#else
    struct timespec timeout;
    timeout.tv_sec=0;
    timeout.tv_nsec=10;
	ret = mq_timedreceive((mqd_t)m_msgID,(char*)(&msg),MSG_SIZE_MAX+4,0,&timeout);
#endif
    if (ret<0)
	{
		//TRACE_ERR_CLASS("MsgQueue(key:0x%x) recieve message(%d) failed(%s).\n",m_key, msgType,ERROR_STRING);
		return STATUS_ERROR;
	}
	return STATUS_OK;
}

/**
 *  \brief  清空消息
 *  \param  msgType 消息体
 *  \return 成功返回STATUS_OK,失败返回STATUS_ERROR
 *  \note   msgType=0 清空队列中所有的消息
 *          msgType>0 清空队列中类型值为msgType的消息
 *          msgType<0 清空队列中类型值小于或等于|msgType|的消息
 */
int MsgQueue::clearMsg(int msgType)
{
    QueueMsg_S msg;
    while(recvMsg(msg, msgType)==STATUS_OK)
    {
        continue;
    }
    return STATUS_OK;
}

/**
 *  \brief  从消息队列工厂取得消息队列
 *  \param  key 消息队列的key
 *  \return MsgQueue* 成功返回消息队列,失败返回NULL
 *  \note   none
 */
MsgQueue* MsgQueueFactory::getMsgQueue(key_t key)
{
    AutoLock lock(&m_lock);
	std::map<key_t, MsgQueue*>::iterator iter = m_msgQueueMap.find(key);
	if (iter!=m_msgQueueMap.end())
	{
		return iter->second;
	}
    else 
    {
        MsgQueue* pMsg = NEW_OBJ MsgQueue(key);
		if (pMsg==NULL)
		{
			return NULL;
		}
		
		if(pMsg->initialize())
		{
			m_msgQueueMap.insert(std::make_pair(key, pMsg));
			return pMsg;
		}
		else
		{
			TRACE_ERR_CLASS("create MsgQueue[key=0x%x] failed!\n",key);
			DEL_OBJ(pMsg);
			return NULL;
		}
	}
}